package com.uxsino.reactorq.clientserver;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.uxsino.reactorq.commons.IFlowItem;
import com.uxsino.reactorq.commons.QueueFlux;
import com.uxsino.reactorq.event.Event;

import reactor.core.Disposable;
import reactor.core.publisher.ConnectableFlux;

public class JMSAsyncClient<ResultType extends IFlowItem> {
    private static Logger logger = LoggerFactory.getLogger(JMSAsyncClient.class);

    private static final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

    private ScheduledFuture<?> timeoutFuture;

    private Disposable valueSubscription;

    private Consumer<Event> eventSender;

    protected QueueFlux<ResultType> flux;

    protected List<ResultType> buffer = new ArrayList<>();

    // callback consumers
    protected Consumer<List<ResultType>> successHandler = null;

    protected Consumer<List<ResultType>> errorHandler = null;

    protected Consumer<ResultType> itemHandler = null;

    protected AtomicBoolean running = new AtomicBoolean(false);

    public JMSAsyncClient(QueueFlux<ResultType> flux, Consumer<Event> eventSender) {
        this.flux = flux;
        this.eventSender = eventSender;

        ConnectableFlux<ResultType> vflux = flux.publish();
        valueSubscription = vflux.subscribe(this::onMessage);
        vflux.subscribe();
        vflux.connect();
    }

    CountDownLatch latch = new CountDownLatch(1);

    protected void onMessage(ResultType msg) {
        if (!running.get())
            return;

        if (!filterMessage(msg))
            return;

        if (msg.isInternal()) {
            IFlowItem.STATUS status = msg.getStatus();

            switch (status) {
            case END_ERROR:
                dispose();
                if (errorHandler != null) {
                    try {
                        errorHandler.accept(buffer);
                    } catch (Exception e) {
                        logger.error("exception dealing with error", e);
                    }
                }

                latch.countDown();
                break;
            case END_SUCCESS:
                dispose();
                if (successHandler != null) {
                    try {
                        successHandler.accept(buffer);
                    } catch (Exception e) {
                        logger.error("exception dealing with query result", e);

                    }
                }
                latch.countDown();
                break;
            default:
            }

        } else {
            buffer.add(msg);
            if (itemHandler != null) {
                try {
                    itemHandler.accept(msg);
                } catch (Exception e) {
                    logger.error("exception dealing with query result item", e);

                }
            }

        }
    }

    public boolean filterMessage(ResultType msg) {
        return true;
    }

    public <T extends Event> void start(T cmd, long timeoutMillis) {
        if (running.getAndSet(true)) {
            logger.warn("already started.");
            return;
        }
        timeoutFuture = scheduler.schedule(new Runnable() {
            @Override
            public void run() {
                if (valueSubscription != null) {
                    valueSubscription.dispose();
                    valueSubscription = null;
                }
                if (errorHandler != null)
                    errorHandler.accept(buffer);
            }
        }, timeoutMillis, TimeUnit.MILLISECONDS);

        cmd.replyTo = flux.getQueue();
        eventSender.accept(cmd);
    }

    /**
     * call in blocking mode
     * @param timeoutMillis timeout milli-seconds
     */
    public <T extends Event> void call(T cmd, long timeoutMillis) {
        if (running.getAndSet(true)) {
            logger.warn("already started.");
            return;
        }
        cmd.replyTo = flux.getQueue();
        eventSender.accept(cmd);
        try {
            latch.await(timeoutMillis, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            dispose();
        }
    }

    public JMSAsyncClient<ResultType> onSuccess(Consumer<List<ResultType>> action) {
        successHandler = action;
        return this;
    }

    public JMSAsyncClient<ResultType> onError(Consumer<List<ResultType>> action) {
        errorHandler = action;
        return this;
    }

    public JMSAsyncClient<ResultType> onItem(Consumer<ResultType> action) {
        itemHandler = action;
        return this;
    }

    public void dispose() {
        running.set(false);
        if (null != timeoutFuture) {
            timeoutFuture.cancel(true);
            timeoutFuture = null;
        }
        if (null != valueSubscription) {
            valueSubscription.dispose();
            valueSubscription = null;
        }
    }

    public List<ResultType> getResults() {
        return buffer;
    }

}
